package com.xiaofan.apitest.tabletest

import com.xiaofan.apitest.source.SensorReading
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.api.{FieldExpression, Table, _}

object Example {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val inputPath = "D:\\big-data\\code\\FlinkTutorial\\src\\main\\resources\\sensor.txt"
    val inputStream: DataStream[String] = env.readTextFile(inputPath)

    val dataStream: DataStream[SensorReading] = inputStream.map(
      data => {
        val arr: Array[String] = data.split(",")
        SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
      }
    )

    // 创建表环境
    val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)

    val dataTable: Table = tableEnv.fromDataStream(dataStream)

    val resultTable: Table = dataTable
      .select($"id", $"temperature")
      .filter($"id" === "sensor_1")

    tableEnv.toAppendStream[(String, Double)](resultTable).print("table result")

    // sql
    tableEnv.createTemporaryView("dataTable", dataStream)
    val sql: String = "select id, temperature from dataTable where id = 'sensor_1'"
    val dataSqlTable: Table = tableEnv.sqlQuery(sql)

    tableEnv.toAppendStream[(String, Double)](dataSqlTable).print("sql result")

    env.execute("table test")
  }
}
